package com.denghq.projectbuilder.component.msgbus.receiver.impl;

import com.denghq.projectbuilder.component.msgbus.receiver.IMsgReceiver;
import com.denghq.projectbuilder.component.msgbus.bus.IMsgProcessor;
import com.denghq.projectbuilder.component.msgbus.util.RabbitMqTopicUtil;
import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.util.CollectionUtils;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class RabbitMqMsgReceiver implements IMsgReceiver {

    private static final int DEFAULT_MAX_MSG_PROCESS_THREAD_NUM = 10; //消息处理线程最大数量
    private static final int MAX_CONCURRENT_CONSUMERS = 1;//消费者最大数量
    private static final int CONCURRENT_CONSUMERS = 1; //消费者数量

    // 配置相关
    /**
     * 消息处理线程最大数量,默认10
     */
    @Setter
    @Getter
    private int maxMsgProcessThreadNum = DEFAULT_MAX_MSG_PROCESS_THREAD_NUM;


    /**
     * 消费者最大数量，默认1
     */
    @Setter
    @Getter
    private int maxConcurrentConsumers = MAX_CONCURRENT_CONSUMERS;

    /**
     * 初始启用消费者数量，默认1
     */
    @Setter
    @Getter
    private int concurrentConsumers = CONCURRENT_CONSUMERS;

    /**
     * 需绑定的路由键
     */
    @Getter
    @Setter
    private List<BindRoutingKey> bindRoutingKeyList;

    /**
     * 需声明的队列
     */
    @Getter
    @Setter
    private List<Queue> declareQueueList;


    /**
     * 需声明的交换机
     */
    @Getter
    @Setter
    private List<Exchange> declareExchangeList;

    @Getter
    @Setter
    /**
     * 消费的队列
     */
    private String[] consumeQueueList;

    private SimpleMessageListenerContainer container;
    @Getter
    private Map<String, List<IMsgProcessor>> topicObserverMap;
    private ExecutorService executor;

    private ConnectionFactory connectionFactory;

    public RabbitMqMsgReceiver(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
        this.topicObserverMap = Maps.newConcurrentMap();
        this.executor = Executors.newFixedThreadPool(this.maxMsgProcessThreadNum);
    }

    private void init() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        //注意顺序
        initDeclareExchange(rabbitAdmin);
        initDeclareQueue(rabbitAdmin);
        initBindRoutingKey(rabbitAdmin);
        initMsgListener(connectionFactory);
    }

    private void initDeclareExchange(RabbitAdmin rabbitAdmin) {
        if (!CollectionUtils.isEmpty(declareExchangeList)) {
            declareExchangeList.forEach(exchange -> rabbitAdmin.declareExchange(exchange));
        }
    }

    private void initDeclareQueue(RabbitAdmin rabbitAdmin) {
        if (!CollectionUtils.isEmpty(declareQueueList)) {
            declareQueueList.forEach(q -> rabbitAdmin.declareQueue(q));
        }
    }

    private void initBindRoutingKey(RabbitAdmin rabbitAdmin) {
        if (!CollectionUtils.isEmpty(bindRoutingKeyList)) {
            bindRoutingKeyList.forEach((bind) -> {
                if (StringUtils.isNotBlank(bind.getQueueName()) && StringUtils.isNotBlank(bind.getRoutingKey())) {
                    //将queue绑定到exchange
                    rabbitAdmin.declareBinding(new Binding(bind.getQueueName(), Binding.DestinationType.QUEUE, bind.getExchangeName(), bind.getRoutingKey(),
                            Collections.<String, Object>emptyMap())); //声明绑定关系
                }
            });
        }
    }

    private void initMsgListener(ConnectionFactory connectionFactory) {

        this.container = new SimpleMessageListenerContainer(connectionFactory);

        if (null != consumeQueueList) {
            container.setQueueNames(consumeQueueList);
        }

        container.setExposeListenerChannel(true);


        container.setMaxConcurrentConsumers(maxConcurrentConsumers);


        container.setConcurrentConsumers(concurrentConsumers);

        container.setAcknowledgeMode(AcknowledgeMode.AUTO); //设置确认模式自动确认

        container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {

                    byte[] body = message.getBody();
                    try {
                        if (!CollectionUtils.isEmpty(topicObserverMap)) {
                            String msg = new String(body);
                            String routingKey = message.getMessageProperties().getReceivedRoutingKey();
                            topicObserverMap.forEach((topic, msgProcessers) -> {
                                if (RabbitMqTopicUtil.routingKeyMatch(topic, routingKey)) {
                                    if (!CollectionUtils.isEmpty(msgProcessers)) {
                                        msgProcessers.forEach(p -> {
                                            try {
                                                executor.execute(() ->
                                                        p.execute(topic, msg)
                                                );
                                            } catch (Exception e) {

                                            }
                                        });
                                    }
                                }
                            });
                        }
                        log.debug("消费 receive msg : " + new String(body));
                        // 消息的标识，false只确认当前一个消息收到，true确认所有consumer获得的消息
                        //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //手动确认确认消息成功消费
                    } catch (Exception e) {
                        log.warn("消费失败: " + new String(body));
                        // ack返回false，并重新回到队列
                       /* try {
                            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                        } catch (IOException e1) {
                            e1.printStackTrace();

                        }*/
                    }
                }
        );
    }


    @Override
    public void startWork() {
        init();
        this.container.start();
    }

    @Override
    public void stopWork() {
        this.container.stop();
        this.executor.shutdown();
    }

    public static class Builder {  //静态内部类，通过该类来创建一个外部类对象，使用链式调用

        private RabbitMqMsgReceiver target;  //创建一个外部类对象

        /**
         * @param connectionFactory 必填参数
         */
        //         * 构造一个内部类对象，通过该对象设置属性值
        //         * 如果存在必填参数，可以以带参构造函数的方式传过来
        public Builder(ConnectionFactory connectionFactory) {
            target = new RabbitMqMsgReceiver(connectionFactory);
        }

        /**
         * @param maxMsgProcessThreadNum
         * @return
         */
        //         * 调动builder对象的maxMsgProcessThreadNum方法设置maxMsgProcessThreadNum值
        //         * 返回builder对象，用来下一个方法设置属性值
        //         * 这就是链式调用
        //         *
        public Builder maxMsgProcessThreadNum(int maxMsgProcessThreadNum) {
            target.maxMsgProcessThreadNum = maxMsgProcessThreadNum;
            return this;
        }

        public Builder maxConcurrentConsumers(int maxConcurrentConsumers) {
            target.maxMsgProcessThreadNum = maxConcurrentConsumers;
            return this;
        }


        public Builder concurrentConsumers(int concurrentConsumers) {

            target.concurrentConsumers = concurrentConsumers;
            return this;
        }

        //需绑定的路由键
        public Builder bindRoutingKeyList(List<BindRoutingKey> bindRoutingKeyList) {
            target.bindRoutingKeyList = bindRoutingKeyList;
            return this;
        }


        //需声明的队列
        public Builder declareQueueList(List<Queue> declareQueueList) {
            target.declareQueueList = declareQueueList;
            return this;
        }


        //需声明的交换机
        public Builder declareExchangeList(List<Exchange> declareExchangeList) {
            target.declareExchangeList = declareExchangeList;
            return this;
        }


        //消费的队列
        public Builder consumeQueueList(String[] consumeQueueList) {
            target.consumeQueueList = consumeQueueList;
            return this;
        }

        /**
         * @return
         */
        public RabbitMqMsgReceiver build() {
            return target; //返回一个外部类
        }
    }
}
